Remaining questions

  • Are there better approaches to allocating the table? (i.e., setting size correctly from start)
  • I see 24 threads for python 3.4 on my macbook retina, but pegged at 100% We should be saturating the disk (i.e., <100% CPU)

In [1]:
from zipfile import ZipFile
from datetime import datetime

from pytz import timezone
import numpy as np
from numpy.lib import recfunctions
import tables as tb

In [2]:
fname = '../local_data/EQY_US_ALL_BBO_20140206.zip'
fields of dailyquotes file taqquote
[0:8]HHMMSSXXX
[9] text EXCHANGE N Nyse  T/Q NASDAQ
[10:25] text symbol 6+10
[26:36] bid price 7+4
[37:43] bid size (units)
[44:54] ask price 7+4
[55:61] ask size
[62] text Condition of quote
[63:66] market maker
[67] bid exchange
[68] ask aexchange
[69:84] int seqno
[85] int bbo indicator
[86] int NASDAQ BBO indocator
[87] text cancel/correction
[88] text C=CTA N=UTP
[90] text Retail interest indicator
[...]

Read this in

# Two characters are also used at the end of each line as a line indicator
widths = [9, 1, 16, 11, 7, 11, 7, 1, 4, 1, 1, 16, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

w = np.array(widths)
w.cumsum()

Note that we'll have 98 bytes total with the \r\n on the end.


In [3]:
BYTES_PER_LINE = 98
# Note - we're using object here (as pandas will do this anyway), 
# and we'll need to convert back to fixed width strings later
# We can get the widths from the widths list above
old_dtype = [('Time', np.datetime64),
         ('Exchange', object),  # |S1
         ('Symbol', object),  # |S16, etc.
         ('Bid_Price', np.float64),
         ('Bid_Size', np.int32),
         ('Ask_Price', np.float64),
         ('Ask_Size', np.int32),
         ('Quote_Condition', object),
         ('Market_Maker', np.int), # This is blank - want to skip?
         ('Bid_Exchange', object),
         ('Ask_Exchange', object),
         ('Sequence_Number', np.int64),
         ('National_BBO_Ind', np.int8), # These aren't really numbers
         ('National_BBO_Ind', np.int8), # Maybe should be string?
         ('Quote_Cancel_Correction', object),
         ('Source_of_Quote', object),
         ('Retail_Interest_Indicator_RPI', object),
         ('Short_Sale_Restriction_Indicator', object),
         ('LULD_BBO_Indicator_CQS', object),
         ('LULD_BBO_Indicator_UTP', object),
         ('FINRA_ADF_MPID_Indicator', object),
         ('SIP_generated_Message_Identifier', object),
         ('National_BBO_LULD_Indicator', object)
        ] # Then there's two characters for newline

# This was for pandas' screwball approach to dtype
# names = [a for a,b in dtype]
# dtype = dict(dtype)

In [4]:
# Note that the '|' character means byte order doesn't matter, 
# which it never will for "bytes" (which is what 'S' stands for)
initial_dtype = [('Time', 'S9'),  # HHMMSSmmm, should be in Eastern Time (ET)
                 # ('hour', '|S2'),
                 # ('minute', '|S2'),
                 # ('second', '|S2'),
                 # ('msec', '|S3'),
                 ('Exchange', 'S1'),
                 ('Symbol', 'S16'),  # Maybe should split into 6 root + 10 suffix
                 ('Bid_Price', 'S11'),  # 7.4 (fixed point)
                 ('Bid_Size', 'S7'),
                 ('Ask_Price', 'S11'),  # 7.4
                 ('Ask_Size', 'S7'),
                 ('Quote_Condition', 'S1'),
                 ('Market_Maker', 'S4'), # This ends up getting discarded, it should always be b'    '
                 ('Bid_Exchange', 'S1'),
                 ('Ask_Exchange', 'S1'),
                 ('Sequence_Number', 'S16'),
                 ('National_BBO_Ind', 'S1'),
                 ('NASDAQ_BBO_Ind', 'S1'),
                 ('Quote_Cancel_Correction', 'S1'),
                 ('Source_of_Quote', 'S1'),
                 ('Retail_Interest_Indicator_RPI', 'S1'),
                 ('Short_Sale_Restriction_Indicator', 'S1'),
                 ('LULD_BBO_Indicator_CQS', 'S1'),
                 ('LULD_BBO_Indicator_UTP', 'S1'),
                 ('FINRA_ADF_MPID_Indicator', 'S1'),
                 ('SIP_generated_Message_Identifier', 'S1'),
                 ('National_BBO_LULD_Indicator', 'S1'),
                 ('newline', 'S2')]

In [5]:
# Justin and Pandas (I think) use time64, as does PyTables.
# We could use msec from beginning of day for now in an int16
# (maybe compare performance to datetime64? But dates should compress very well...)
time_col = 'Time'

convert_dtype = [
               ('Bid_Price', np.float64),
               ('Bid_Size', np.int32),
               ('Ask_Price', np.float64),
               ('Ask_Size', np.int32),
               # ('Market_Maker', np.int8),  # This is not currently used, and should always be b'    '
               ('Sequence_Number', np.int64),
               # ('National_BBO_Ind', np.int8),  # The _Ind fields are actually categorical - leaving as strings
               # ('NASDAQ_BBO_Ind', np.int8),
              ]

passthrough_strings = ['Exchange',
                     'Symbol',
                     'Quote_Condition',
                     'Bid_Exchange',
                     'Ask_Exchange',
                     'National_BBO_Ind',  # The _Ind fields are actually categorical - leaving as strings
                     'NASDAQ_BBO_Ind',
                     'Quote_Cancel_Correction',
                     'Source_of_Quote',
                     'Retail_Interest_Indicator_RPI',
                     'Short_Sale_Restriction_Indicator',
                     'LULD_BBO_Indicator_CQS',
                     'LULD_BBO_Indicator_UTP',
                     'FINRA_ADF_MPID_Indicator',
                     'SIP_generated_Message_Identifier',
                     'National_BBO_LULD_Indicator']

In [6]:
# Lifted from blaze.pytables
def dtype_to_pytables(dtype):
    """ Convert NumPy dtype to PyTable descriptor
    Examples
    --------
    >>> from tables import Int32Col, StringCol, Time64Col
    >>> dt = np.dtype([('name', 'S7'), ('amount', 'i4'), ('time', 'M8[us]')])
    >>> dtype_to_pytables(dt)  # doctest: +SKIP
    {'amount': Int32Col(shape=(), dflt=0, pos=1),
     'name': StringCol(itemsize=7, shape=(), dflt='', pos=0),
     'time': Time64Col(shape=(), dflt=0.0, pos=2)}
    """
    d = {}
    for pos, name in enumerate(dtype.names):
        dt, _ = dtype.fields[name]
        if issubclass(dt.type, np.datetime64):
            tdtype = tb.Description({name: tb.Time64Col(pos=pos)}),
        else:
            tdtype = tb.descr_from_dtype(np.dtype([(name, dt)]))
        el = tdtype[0]  # removed dependency on toolz -DJC
        getattr(el, name)._v_pos = pos
        d.update(el._v_colobjects)
    return d

In [7]:
# The "easy" dtypes are the "not datetime" dtypes
easy_dtype = []
convert_dict = dict(convert_dtype)

for name, dtype in initial_dtype:
    if name in convert_dict:
        easy_dtype.append( (name, convert_dict[name]) )
    elif name in passthrough_strings:
        easy_dtype.append( (name, dtype) )

# PyTables will not accept np.datetime64, we hack below, but we use it to work with the blaze
# function above.
# We also shift Time to the end (while I'd rather maintain order), as it's more efficient for Dav
# given the technical debt he's already built up.
pytables_dtype = easy_dtype + [('Time', 'datetime64[ms]')]

pytables_desc = dtype_to_pytables(np.dtype(pytables_dtype))

In [8]:
class TAQ2HDF5:
    
    def __init__(self, taq_fname):
        self.taq_fname = taq_fname
    
    def convert_taq(self):
        # The below doesn't work for pandas (and neither does `unzip` from the command line). Probably want to use
        # something like `7z x -so my_file.zip 2> /dev/null` if we use pandas.
        with ZipFile(self.taq_fname) as zfile:
            for inside_f in zfile.filelist:
                # The original filename is available as inside_f.filename
                with zfile.open(inside_f.filename) as infile:
                    first = infile.readline()

                    # You need to use bytes to split bytes
                    dateish, numlines = first.split(b":")
                    numlines = int(numlines)
                    
                    # Get dates to combine with times later
                    # This is a little over-trusting of the spec...
                    self.month = int(dateish[2:4])
                    self.day = int(dateish[4:6])
                    self.year = int(dateish[6:10])

                    # Should I use a context manager here?
                    h5_table = self.setup_hdf5(inside_f.filename, numlines)
                    try:
                        self.raw_conversion(numlines, infile, h5_table)
                    finally:
                        self.finalize_hdf5()
                    
    def setup_hdf5(self, h5_fname_root, numlines):
        # We're using aggressive compression and checksums, since this will likely stick around
        # Stopping one level short of max compression - don't be greedy.
        self.h5 = tb.open_file(h5_fname_root + '.h5', title=h5_fname_root, mode='w', 
                          filters=tb.Filters(complevel=8, complib='blosc:lz4hc', fletcher32=True) )
                     
        return self.h5.create_table('/', 'daily_quotes', description=pytables_desc, expectedrows=numlines)
    
                          
    def finalize_hdf5(self):
        self.h5.close()

    def process_chunk(self, all_strings):
        # This is unnecessary copying
        easy_converted = all_strings.astype(easy_dtype)
        
        # These don't have the decimal point in the TAQ file
        for dollar_col in ['Bid_Price', 'Ask_Price']:
            easy_converted[dollar_col] /= 10000
        
        # Currently, there doesn't seem to be any utility to converting to numpy.datetime64
        # PyTables wants float64's corresponding to the POSIX Standar (relative to 1970-01-01, UTC)
        converted_time = [datetime( self.year, self.month, self.day, 
                                     int(raw[:2]), int(raw[2:4]), int(raw[4:6]),
                                     int(raw[6:9]) * 1000,  # msec needs to be microsec 
                                     tzinfo=timezone('US/Eastern') ).timestamp()
                          for raw in all_strings['Time'] ]

        
        # More unnecessary copying
        records = recfunctions.append_fields(easy_converted, 'Time', converted_time, usemask=False)
        
        return records
        
        
    # at some point, we might optimize chunksize. For now, assume PyTables is smart
    def raw_conversion(self, numlines, infile, out, chunksize=None):
        '''Read raw bytes from TAQ, write to HDF5'''
        index = 0
        
        if chunksize is None:
            chunksize = out.chunkshape[0]
        
        while(True):
            raw_bytes = infile.read(BYTES_PER_LINE * chunksize)
            
            ## Break after 10 lines
            index = index + 1
            if index == 10:
                break
                
            if not raw_bytes:
                break
            # If we use asarray with this dtype, it crashes Python! (might not be true anymore)
            # ndarray gives 'S' arrays instead of chararrays (as recarray does)
            all_strings = np.ndarray(chunksize, buffer=raw_bytes, dtype=initial_dtype)

            # This approach doesn't work...
            # out[chunk_start:chunk_stop, 1:] = all_strings[:,1:-1]
            
            out.append( self.process_chunk(all_strings) )

In [10]:
i = 0
with ZipFile(fname) as zfile:
    for inside_f in zfile.filelist:
        with zfile.open(inside_f.filename) as infile:
            first = infile.readline() # I dont want to print the first line
            while i < 5:
                first = infile.readline()
                i = i + 1

                dateish = first.split()
                print(dateish)    # split 
                #print(dateish[0]) # the first chunck node              
                
                hour = dateish[0][0:2]
                month = dateish[0][2:4]
                second = dateish[0][4:6]
                mesc = dateish[0][6:9]
                print(month, hour, ":", second, mesc)


[b'040000901PA', b'000000000000000000000000000000000000R', b'PP000000000000001422', b'C']
b'00' b'04' : b'00' b'901'
[b'075300081PA', b'000000000000000000000007294000000027R', b'PP000000000007625512', b'C']
b'53' b'07' : b'00' b'081'
[b'075300085PA', b'000000000000000000000006076000000010R', b'PP000000000007625612', b'C']
b'53' b'07' : b'00' b'085'
[b'075300089PA', b'000004190000000027000006076000000010R', b'PP000000000007625712', b'C']
b'53' b'07' : b'00' b'089'
[b'075300094PA', b'000005407000000027000006076000000010R', b'PP000000000007625812', b'C']
b'53' b'07' : b'00' b'094'

In [ ]:
class read:
     def __init__(self, taq_fname):
        self.taq_fname = taq_fname
    
    def convert_taq(self):
        # The below doesn't work for pandas (and neither does `unzip` from the command line). Probably want to use
        # something like `7z x -so my_file.zip 2> /dev/null` if we use pandas
        i = 0
        with ZipFile(self.taq_fname) as zfile:
            for inside_f in zfile.filelist:
                with zfile.open(inside_f.filename) as infile:
                    first = infile.readline() # I dont want to print the first line
                    while i < 1:
                        first = infile.readline()
                        i = i + 1

                        dateish = first.split()
                        print(dateish)    # split      
                
                        hour = dateish[0][0:2]
                        month = dateish[0][2:4]
                        second = dateish[0][4:6]
                        mesc = dateish[0][6:9]
                        print(hour, month, second, mesc)

                    # Should I use a context manager here?
                    h5_table = self.setup_hdf5(inside_f.filename, numlines)
                    try:
                        self.raw_conversion(numlines, infile, h5_table)
                    finally:
                        self.finalize_hdf5()
                        
    def setup_hdf5(self, h5_fname_root, numlines):
        # We're using aggressive compression and checksums, since this will likely stick around
        # Stopping one level short of max compression - don't be greedy.
        self.h5 = tb.open_file(h5_fname_root + '.h5', title=h5_fname_root, mode='w', 
                          filters=tb.Filters(complevel=8, complib='blosc:lz4hc', fletcher32=True) )
                     
        return self.h5.create_table('/', 'daily_quotes', description=pytables_desc, expectedrows=numlines)
    
                          
    def finalize_hdf5(self):
        self.h5.close()

    def process_chunk(self, all_strings):
        # This is unnecessary copying
        easy_converted = all_strings.astype(easy_dtype)
        
        # These don't have the decimal point in the TAQ file
        for dollar_col in ['Bid_Price', 'Ask_Price']:
            easy_converted[dollar_col] /= 10000
        
        # Currently, there doesn't seem to be any utility to converting to numpy.datetime64
        # PyTables wants float64's corresponding to the POSIX Standar (relative to 1970-01-01, UTC)
        converted_time = [datetime( self.year, self.month, self.day, 
                                     int(raw[:2]), int(raw[2:4]), int(raw[4:6]),
                                     int(raw[6:9]) * 1000,  # msec needs to be microsec 
                                     tzinfo=timezone('US/Eastern') ).timestamp()
                          for raw in all_strings['Time'] ]

        
        # More unnecessary copying
        records = recfunctions.append_fields(easy_converted, 'Time', converted_time, usemask=False)
        
        return records
        
        
    # at some point, we might optimize chunksize. For now, assume PyTables is smart
    def raw_conversion(self, numlines, infile, out, chunksize=None):
        '''Read raw bytes from TAQ, write to HDF5'''
        index = 0
        
        if chunksize is None:
            chunksize = out.chunkshape[0]
        
        while(True):
            raw_bytes = infile.read(BYTES_PER_LINE * chunksize)
                           
            if not raw_bytes:
                break
            # If we use asarray with this dtype, it crashes Python! (might not be true anymore)
            # ndarray gives 'S' arrays instead of chararrays (as recarray does)
            all_strings = np.ndarray(chunksize, buffer=raw_bytes, dtype=initial_dtype)

            # This approach doesn't work...
            # out[chunk_start:chunk_stop, 1:] = all_strings[:,1:-1]
            
            out.append( self.process_chunk(all_strings) )

Let's process our file


In [10]:
test_run = TAQ2HDF5(fname)

In [11]:
%time test_run.convert_taq()


Wall time: 2.69 s

In [12]:
h5 = tb.open_file('./taqquote20140206.h5')

In [16]:
h5.root.daily_quotes[:]


Out[16]:
array([ (b'P', b'A               ', 0.0, 0, 0.0, 0, b'R', b'P', b'P', 14, b'2', b'2', b' ', b'C', b' ', b' ', b' ', b' ', b' ', b' ', b' ', 1391676960.901),
       (b'P', b'A               ', 0.0, 0, 72.94, 27, b'R', b'P', b'P', 76255, b'1', b'2', b' ', b'C', b' ', b' ', b' ', b' ', b' ', b' ', b' ', 1391690940.081),
       (b'P', b'A               ', 0.0, 0, 60.76, 10, b'R', b'P', b'P', 76256, b'1', b'2', b' ', b'C', b' ', b' ', b' ', b' ', b' ', b' ', b' ', 1391690940.085),
       ...,
       (b'N', b'A               ', 58.17, 4, 58.19, 7, b'R', b'N', b'N', 21017149, b'6', b'2', b'A', b'C', b'A', b' ', b' ', b' ', b' ', b' ', b'A', 1391717739.258),
       (b'N', b'A               ', 58.17, 5, 58.19, 7, b'R', b'N', b'N', 21017154, b'0', b'2', b'A', b'C', b'A', b' ', b' ', b' ', b' ', b' ', b'A', 1391717739.258),
       (b'B', b'A               ', 58.17, 1, 58.2, 1, b'R', b'B', b'B', 21017424, b'0', b'2', b' ', b'C', b' ', b' ', b' ', b' ', b' ', b' ', b' ', 1391717739.419)], 
      dtype=[('Exchange', 'S1'), ('Symbol', 'S16'), ('Bid_Price', '<f8'), ('Bid_Size', '<i4'), ('Ask_Price', '<f8'), ('Ask_Size', '<i4'), ('Quote_Condition', 'S1'), ('Bid_Exchange', 'S1'), ('Ask_Exchange', 'S1'), ('Sequence_Number', '<i8'), ('National_BBO_Ind', 'S1'), ('NASDAQ_BBO_Ind', 'S1'), ('Quote_Cancel_Correction', 'S1'), ('Source_of_Quote', 'S1'), ('Retail_Interest_Indicator_RPI', 'S1'), ('Short_Sale_Restriction_Indicator', 'S1'), ('LULD_BBO_Indicator_CQS', 'S1'), ('LULD_BBO_Indicator_UTP', 'S1'), ('FINRA_ADF_MPID_Indicator', 'S1'), ('SIP_generated_Message_Identifier', 'S1'), ('National_BBO_LULD_Indicator', 'S1'), ('Time', '<f8')])

In [17]:
h5.close()

In [24]:



Out[24]:
<__main__.test at 0x82aa320>

In [ ]: